Package org.apache.wayang.profiler.spark
Class SparkOperatorProfiler
- java.lang.Object
-
- org.apache.wayang.profiler.spark.SparkOperatorProfiler
-
- Direct Known Subclasses:
BinaryOperatorProfiler
,SinkProfiler
,SparkSourceProfiler
,SparkUnaryOperatorProfiler
public abstract class SparkOperatorProfiler extends java.lang.Object
Allows to instrument anSparkExecutionOperator
.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
SparkOperatorProfiler.Result
The result of a single profiling run.
-
Field Summary
Fields Modifier and Type Field Description int
cpuMhz
protected java.util.List<java.util.function.Supplier<?>>
dataQuantumGenerators
protected long
executionPaddingTime
protected FunctionCompiler
functionCompiler
protected java.util.List<java.lang.Long>
inputCardinalities
protected org.apache.logging.log4j.Logger
logger
int
numCoresPerMachine
int
numMachines
int
numPartitions
protected SparkExecutionOperator
operator
protected java.util.function.Supplier<SparkExecutionOperator>
operatorGenerator
protected SparkExecutor
sparkExecutor
-
Constructor Summary
Constructors Constructor Description SparkOperatorProfiler(java.util.function.Supplier<SparkExecutionOperator> operatorGenerator, Configuration configuration, java.util.function.Supplier<?>... dataQuantumGenerators)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description void
cleanUp()
Override this method to implement any clean-up logic.protected static RddChannel.Instance
createChannelInstance(org.apache.spark.api.java.JavaRDD<?> rdd, SparkExecutor sparkExecutor)
Creates aChannelInstance
that carries the givenrdd
.protected static RddChannel.Instance
createChannelInstance(SparkExecutor sparkExecutor)
Creates an emptyChannelInstance
.protected void
evaluate(SparkExecutionOperator operator, ChannelInstance[] inputs, ChannelInstance[] outputs)
protected abstract SparkOperatorProfiler.Result
executeOperator()
Executes the profiling task.protected <T> org.apache.spark.api.java.JavaRDD<T>
partition(org.apache.spark.api.java.JavaRDD<T> rdd)
If a desired number of partitions for the inputJavaRDD
s is requested, enforce this.void
prepare(long... inputCardinalities)
Call this method beforerun()
to prepare the profiling runprotected abstract void
prepareInput(int inputIndex, long inputCardinality)
protected <T> org.apache.spark.api.java.JavaRDD<T>
prepareInputRdd(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
.protected <T> org.apache.spark.api.java.JavaRDD<T>
prepareInputRddInDriver(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
.protected <T> org.apache.spark.api.java.JavaRDD<T>
prepareInputRddInWorker(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
.protected long
provideCpuCycles(long startTime, long endTime)
Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.protected long
provideDiskBytes(long startTime, long endTime)
Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.protected long
provideNetworkBytes(long startTime, long endTime)
Estimates the network bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.SparkOperatorProfiler.Result
run()
Executes and profiles the profiling task.
-
-
-
Field Detail
-
logger
protected final org.apache.logging.log4j.Logger logger
-
operatorGenerator
protected java.util.function.Supplier<SparkExecutionOperator> operatorGenerator
-
dataQuantumGenerators
protected final java.util.List<java.util.function.Supplier<?>> dataQuantumGenerators
-
cpuMhz
public int cpuMhz
-
numMachines
public int numMachines
-
numCoresPerMachine
public int numCoresPerMachine
-
numPartitions
public int numPartitions
-
executionPaddingTime
protected final long executionPaddingTime
-
operator
protected SparkExecutionOperator operator
-
sparkExecutor
protected SparkExecutor sparkExecutor
-
functionCompiler
protected final FunctionCompiler functionCompiler
-
inputCardinalities
protected java.util.List<java.lang.Long> inputCardinalities
-
-
Constructor Detail
-
SparkOperatorProfiler
public SparkOperatorProfiler(java.util.function.Supplier<SparkExecutionOperator> operatorGenerator, Configuration configuration, java.util.function.Supplier<?>... dataQuantumGenerators)
-
-
Method Detail
-
prepare
public void prepare(long... inputCardinalities)
Call this method beforerun()
to prepare the profiling run- Parameters:
inputCardinalities
- number of input elements for each input of the profiled operator
-
prepareInput
protected abstract void prepareInput(int inputIndex, long inputCardinality)
-
prepareInputRdd
protected <T> org.apache.spark.api.java.JavaRDD<T> prepareInputRdd(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
. Uses an implementation based on thewayang.profiler.datagen.location
property.
-
prepareInputRddInDriver
protected <T> org.apache.spark.api.java.JavaRDD<T> prepareInputRddInDriver(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
.
-
prepareInputRddInWorker
protected <T> org.apache.spark.api.java.JavaRDD<T> prepareInputRddInWorker(long cardinality, int inputIndex)
Helper method to generate data quanta and provide them as a cachedJavaRDD
.
-
partition
protected <T> org.apache.spark.api.java.JavaRDD<T> partition(org.apache.spark.api.java.JavaRDD<T> rdd)
If a desired number of partitions for the inputJavaRDD
s is requested, enforce this.
-
run
public SparkOperatorProfiler.Result run()
Executes and profiles the profiling task. Requires that this instance is prepared.
-
provideCpuCycles
protected long provideCpuCycles(long startTime, long endTime)
Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.
-
provideNetworkBytes
protected long provideNetworkBytes(long startTime, long endTime)
Estimates the network bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.
-
provideDiskBytes
protected long provideDiskBytes(long startTime, long endTime)
Estimates the disk bytes occurred in the cluster during the given time span by waiting for Ganglia to provide the respective information in its RRD files.
-
executeOperator
protected abstract SparkOperatorProfiler.Result executeOperator()
Executes the profiling task. Requires that this instance is prepared.
-
evaluate
protected void evaluate(SparkExecutionOperator operator, ChannelInstance[] inputs, ChannelInstance[] outputs)
-
createChannelInstance
protected static RddChannel.Instance createChannelInstance(org.apache.spark.api.java.JavaRDD<?> rdd, SparkExecutor sparkExecutor)
Creates aChannelInstance
that carries the givenrdd
.
-
createChannelInstance
protected static RddChannel.Instance createChannelInstance(SparkExecutor sparkExecutor)
Creates an emptyChannelInstance
.
-
cleanUp
public void cleanUp()
Override this method to implement any clean-up logic.
-
-